1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package storm.starter.bolt;
19  
20  import backtype.storm.Config;
21  import backtype.storm.topology.BasicOutputCollector;
22  import backtype.storm.topology.OutputFieldsDeclarer;
23  import backtype.storm.tuple.Fields;
24  import backtype.storm.tuple.Tuple;
25  import backtype.storm.tuple.Values;
26  import org.testng.annotations.DataProvider;
27  import org.testng.annotations.Test;
28  import storm.starter.tools.MockTupleHelpers;
29  import storm.starter.tools.Rankings;
30  
31  import java.util.Map;
32  
33  import static org.fest.assertions.api.Assertions.assertThat;
34  import static org.mockito.Matchers.any;
35  import static org.mockito.Mockito.*;
36  
37  public class TotalRankingsBoltTest {
38  
39    private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
40    private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
41    private static final Object ANY_OBJECT = new Object();
42    private static final int ANY_TOPN = 10;
43    private static final long ANY_COUNT = 42;
44  
45    private Tuple mockRankingsTuple(Object obj, long count) {
46      Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
47      Rankings rankings = mock(Rankings.class);
48      when(tuple.getValue(0)).thenReturn(rankings);
49      return tuple;
50    }
51  
52    @DataProvider
53    public Object[][] illegalTopN() {
54      return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
55    }
56  
57    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
58    public void negativeOrZeroTopNShouldThrowIAE(int topN) {
59      new TotalRankingsBolt(topN);
60    }
61  
62    @DataProvider
63    public Object[][] illegalEmitFrequency() {
64      return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
65    }
66  
67    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
68    public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
69      new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
70    }
71  
72    @DataProvider
73    public Object[][] legalTopN() {
74      return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
75    }
76  
77    @Test(dataProvider = "legalTopN")
78    public void positiveTopNShouldBeOk(int topN) {
79      new TotalRankingsBolt(topN);
80    }
81  
82    @DataProvider
83    public Object[][] legalEmitFrequency() {
84      return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
85    }
86  
87    @Test(dataProvider = "legalEmitFrequency")
88    public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
89      new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
90    }
91  
92    @Test
93    public void shouldEmitSomethingIfTickTupleIsReceived() {
94      // given
95      Tuple tickTuple = MockTupleHelpers.mockTickTuple();
96      BasicOutputCollector collector = mock(BasicOutputCollector.class);
97      TotalRankingsBolt bolt = new TotalRankingsBolt();
98  
99      // when
100     bolt.execute(tickTuple, collector);
101 
102     // then
103     // verifyZeroInteractions(collector);
104     verify(collector).emit(any(Values.class));
105   }
106 
107   @Test
108   public void shouldEmitNothingIfNormalTupleIsReceived() {
109     // given
110     Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
111     BasicOutputCollector collector = mock(BasicOutputCollector.class);
112     TotalRankingsBolt bolt = new TotalRankingsBolt();
113 
114     // when
115     bolt.execute(normalTuple, collector);
116 
117     // then
118     verifyZeroInteractions(collector);
119   }
120 
121   @Test
122   public void shouldDeclareOutputFields() {
123     // given
124     OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
125     TotalRankingsBolt bolt = new TotalRankingsBolt();
126 
127     // when
128     bolt.declareOutputFields(declarer);
129 
130     // then
131     verify(declarer, times(1)).declare(any(Fields.class));
132   }
133 
134   @Test
135   public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
136     // given
137     TotalRankingsBolt bolt = new TotalRankingsBolt();
138 
139     // when
140     Map<String, Object> componentConfig = bolt.getComponentConfiguration();
141 
142     // then
143     assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
144     Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
145     assertThat(emitFrequencyInSeconds).isGreaterThan(0);
146   }
147 }